package fusion

import (
	"container/list"
	"sync"
	"sync/atomic"
	"time"
)

const DEF_RPC_TICK_SIZE = 256

const RPCInvokeTimeout int64 = 5

const (
	RPCErrorNone int32 = iota
	RPCErrorTimeout
	RPCErrorInterrupt
	RPCErrorCancel
	RPCErrorFailed
)

type RPCCBArgs struct {
	pck *NetBuffer
	err int32
	eof bool
}

type RPCBlockContext struct {
	args  list.List
	mutex sync.Mutex
	cond  *sync.Cond
}

type RPCReq struct {
	sn     uint64
	cb     func(*NetBuffer, int32, bool)
	expiry int64
	slot   int
	elem   *list.Element
}

type RPCManager struct {
	reqSN    uint64
	reqObjs  map[uint64]*RPCReq
	tickVal  int64
	tickObjs [DEF_RPC_TICK_SIZE]list.List
	tasks    chan func()
}

func NewRPCManager(n int) (mgr *RPCManager) {
	mgr = new(RPCManager)
	mgr.reqObjs = make(map[uint64]*RPCReq)
	mgr.tickVal = time.Now().Unix()
	mgr.tasks = make(chan func(), n)
	return
}

func (mgr *RPCManager) OnTick() {
	tickVal := time.Now().Unix()
	for ; mgr.tickVal < tickVal; mgr.tickVal++ {
		tickObjs := &mgr.tickObjs[mgr.tickVal%DEF_RPC_TICK_SIZE]
		for elem := tickObjs.Front(); elem != nil; {
			reqObj := elem.Value.(*RPCReq)
			elem = elem.Next()
			if reqObj.expiry <= mgr.tickVal {
				tickObjs.Remove(reqObj.elem)
				delete(mgr.reqObjs, reqObj.sn)
				reqObj.cb(nil, RPCErrorTimeout, true)
			}
		}
	}
}

func (mgr *RPCManager) AddReq(cb func(*NetBuffer, int32, bool), timeout int64) (sn uint64) {
	sn = atomic.AddUint64(&mgr.reqSN, 1)
	mgr.tasks <- func() {
		reqObj := &RPCReq{sn: sn, cb: cb, expiry: mgr.tickVal + timeout}
		reqObj.slot = int(reqObj.expiry % DEF_RPC_TICK_SIZE)
		reqObj.elem = mgr.tickObjs[reqObj.slot].PushBack(reqObj)
		mgr.reqObjs[reqObj.sn] = reqObj
	}
	return
}

func (mgr *RPCManager) CancelReq(sn uint64, err int32) {
	mgr.tasks <- func() {
		if reqObj, isOK := mgr.reqObjs[sn]; isOK {
			mgr.tickObjs[reqObj.slot].Remove(reqObj.elem)
			delete(mgr.reqObjs, reqObj.sn)
			reqObj.cb(nil, err, true)
		}
	}
}

func (mgr *RPCManager) Reply(pck *NetBuffer, info *RPCRespMetaInfo) {
	mgr.tasks <- func() {
		if reqObj, isOK := mgr.reqObjs[info.sn]; isOK {
			if info.eof {
				mgr.tickObjs[reqObj.slot].Remove(reqObj.elem)
				delete(mgr.reqObjs, reqObj.sn)
			}
			reqObj.cb(pck, info.err, info.eof)
		}
	}
}

func (ctx *RPCBlockContext) WaitResp() (*NetBuffer, int32, bool) {
	for ctx.mutex.Lock(); ; ctx.cond.Wait() {
		if ctx.args.Len() != 0 {
			args := ctx.args.Remove(ctx.args.Front()).(*RPCCBArgs)
			ctx.mutex.Unlock()
			return args.pck, args.err, args.eof
		}
	}
}
